ISpout
概述
核心接口(interface),负责将数据发送到topology中去处理
Storm会跟踪Spout发出去的tuple的DAG
ack/fail
tuple: message id
ack/fail/nextTuple是在同一个线程中执行的,所以不用考虑线程安全方面
核心方法
open: 初始化操作
close: 资源释放操作
nextTuple: 发送数据 core api
ack: tuple处理成功,storm会反馈给spout一个成功消息
fail:tuple处理失败,storm会发送一个消息给spout,处理失败
实现类
1 | public abstract class BaseRichSpout extends BaseComponent implements IRichSpout { |
IComponent接口
概述:
public interface IComponent extends Serializable
为topology中所有可能的组件提供公用的方法
void declareOutputFields(OutputFieldsDeclarer declarer);
用于声明当前Spout/Bolt发送的tuple的名称
使用OutputFieldsDeclarer配合使用
实现类:
1 | public abstract class BaseComponent implements IComponent |
IBolt接口
概述
职责:接收tuple处理,并进行相应的处理(filter/join/….)
hold住tuple再处理
IBolt会在一个运行的机器上创建,使用Java序列化它,然后提交到主节点(nimbus)上去执行
nimbus会启动worker来反序列化,调用prepare方法,然后才开始处理tuple处理
方法
prepare:初始化
execute:处理一个tuple数据,tuple对象中包含了元数据信息
cleanup:shutdown之前的资源清理操作
实现类:
1 | public abstract class BaseRichBolt extends BaseComponent implements IRichBolt { |
求和案例
需求:1 + 2 + 3 + …. = ???
实现方案:Spout发送数字作为input
使用Bolt来处理业务逻辑:求和
将结果输出到控制台拓扑设计: DataSourceSpout –> SumBolt
1 | import java.util.Map; |
词频统计
需求:读取指定目录的数据,并实现单词计数功能
实现方案:Spout来读取指定目录的数据,作为后续Bolt处理的input
使用一个Bolt把input的数据,切割开,我们按照逗号进行分割
使用一个Bolt来进行最终的单词的次数统计操作
并输出拓扑设计: DataSourceSpout ==> SplitBolt ==> CountBolt
1 | import java.io.File; |
Storm编程注意事项
1) Exception in thread “main” java.lang.IllegalArgumentException: Spout has already been declared for id DataSourceSpout
【不能bolt和spout命名一样】
2) org.apache.storm.generated.InvalidTopologyException: null
【还不能以双下划线开头命名】
3) Topology的名称不是重复: local似乎没问题, 等我们到集群测试的时候再来验证这个问题
【同时启用同一个Topology】